Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): prefer SpecificParserConfig over SourceStruct #12602

Merged
merged 4 commits into from
Oct 21, 2023

Conversation

xiangjinwu
Copy link
Contributor

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Preparation for the following goal:

  • consolidate enum ProtocolProperties and enum SourceFormat into a single enum ConnectorFormat (or derived similar to encode below when needed).
  • consolidate enum EncodingProperties and enum SourceEncode into enum ConnectorEncode with derived EnumDiscriminants.

In this first part, we observe that most callers are always calling these two in sequence:

  • extract_source_struct: PbStreamSourceInfo -> SourceStruct
  • SpecificParserConfig::new: (SourceStruct, PbStreamSourceInfo, HashMap<String, String>) -> Self

So we change it into a single call, not exposing the intermediate SourceStruct:

  • SpecificParserConfig::new: (PbStreamSourceInfo, HashMap<String, String>) -> Self

Other places using SourceStruct:

  • FsSourceDesc does not use source_struct. It already contains SpecificParserConfig inside FsConnectorSource.
  • DatagenEventGenerator only supports json today. When it supports more encodings later, it would need the properties and having only enum discriminants is not enough.
  • handle_alter_source_column rejects avro or protobuf based on discriminant alone. It also rejects json with schema registry. It is unclear whether we need discriminant alone or the full properties. This usage has not been changed by this PR.

In the second part, we will do trivial renames to consolidate the enums as mentioned above.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@codecov
Copy link

codecov bot commented Oct 2, 2023

Codecov Report

Merging #12602 (7cc8e0c) into main (6833305) will increase coverage by 0.01%.
The diff coverage is 53.48%.

@@            Coverage Diff             @@
##             main   #12602      +/-   ##
==========================================
+ Coverage   68.76%   68.77%   +0.01%     
==========================================
  Files        1495     1495              
  Lines      250159   250092      -67     
==========================================
- Hits       172031   172011      -20     
+ Misses      78128    78081      -47     
Flag Coverage Δ
rust 68.77% <53.48%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
src/connector/src/parser/avro/parser.rs 68.38% <100.00%> (-0.15%) ⬇️
src/connector/src/parser/mod.rs 46.36% <100.00%> (+1.53%) ⬆️
src/connector/src/parser/protobuf/parser.rs 86.17% <100.00%> (-0.25%) ⬇️
...c/connector/src/source/datagen/source/generator.rs 88.70% <100.00%> (+0.57%) ⬆️
src/connector/src/source/datagen/source/reader.rs 82.69% <100.00%> (ø)
src/frontend/src/handler/alter_source_column.rs 85.38% <ø> (ø)
src/batch/src/executor/source.rs 0.00% <0.00%> (ø)
src/connector/src/parser/debezium/avro_parser.rs 46.36% <0.00%> (+0.41%) ⬆️
src/frontend/src/handler/create_source.rs 49.37% <42.85%> (+0.70%) ⬆️
src/source/src/source_desc.rs 56.00% <20.00%> (+6.00%) ⬆️
... and 1 more

... and 3 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Comment on lines -103 to +102
SourceStruct::new(SourceFormat::Plain, SourceEncode::Avro),
info,
with_properties,
)?;
let parser_config = SpecificParserConfig::new(info, with_properties)?;
Copy link
Contributor Author

@xiangjinwu xiangjinwu Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the caller is actually passing in UpsertAvro today. Is the caller supposed to call extract_upsert_avro_table_schema?

It does not affect correctness right now because only encoding_properties field is used, and SpecificParserConfig::new handles UpsertAvro and PlainAvro the same for encoding.

let stream_source_info = StreamSourceInfo {
key_message_name,
format: FormatType::Upsert as i32,
row_encode: EncodeType::Avro as i32,
row_schema_location: avro_schema.row_schema_location.0.clone(),
use_schema_registry: avro_schema.use_schema_registry,
proto_message_name: message_name.unwrap_or(AstString("".into())).0,
upsert_avro_primary_key,
name_strategy,
..Default::default()
};
let columns =
extract_avro_table_schema(&stream_source_info, with_properties).await?;
(Some(columns), sql_defined_pk_names, stream_source_info)
} else {
let stream_source_info = StreamSourceInfo {
format: FormatType::Upsert as i32,
row_encode: EncodeType::Avro as i32,
row_schema_location: avro_schema.row_schema_location.0.clone(),
use_schema_registry: avro_schema.use_schema_registry,
proto_message_name: message_name.unwrap_or(AstString("".into())).0,
name_strategy,
key_message_name,
..Default::default()
};
let (columns, pk_from_avro) =
extract_upsert_avro_table_schema(&stream_source_info, with_properties).await?;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dates back to first day upsert was supported: #8111

@fuyufjh fuyufjh requested review from fuyufjh and tabVersion October 3, 2023 07:39
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation LGTM

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm

@xiangjinwu
Copy link
Contributor Author

Let's hold this PR for a bit until we figure out whether the inconsistency above is intentional.

let source_struct = extract_source_struct(&self.source_info)?;
let psrser_config =
SpecificParserConfig::new(source_struct, &self.source_info, &self.properties)?;
let psrser_config = SpecificParserConfig::new(&self.source_info, &self.properties)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let psrser_config = SpecificParserConfig::new(&self.source_info, &self.properties)?;
let parser_config = SpecificParserConfig::new(&self.source_info, &self.properties)?;

Copy link
Contributor

@neverchanje neverchanje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All updates seem bugless. LGTM.

@xiangjinwu
Copy link
Contributor Author

According to a recent offline discussion, the special path of upsert-avro-with-sql_defined_pk pretending to be plain-avro is going to be removed. This refactor will be rebased after that is done.

@xiangjinwu xiangjinwu enabled auto-merge October 21, 2023 07:51
@xiangjinwu xiangjinwu added this pull request to the merge queue Oct 21, 2023
Merged via the queue into main with commit 19643ad Oct 21, 2023
7 of 8 checks passed
@xiangjinwu xiangjinwu deleted the refactor-source-struct-format-encode branch October 21, 2023 08:17
@xiangjinwu
Copy link
Contributor Author

PR merged after digging and confirming an answer for the following question:

One of the caller is actually passing in UpsertAvro today. Is the caller supposed to call extract_upsert_avro_table_schema?

TLDR:

  • Before this PR, the relevant code path allows certain create source/table to succeed, but the source parser constantly throws errors that are suppressed.
  • After this PR, such create source/table would fail with the corresponding source parser error.

Back to the question, there were 3 options:

  1. (Before this PR) call extract_avro_table_schema, which pretends to be (plain, avro).
  2. (After this PR) call extract_avro_table_schema, which uses (upsert, avro) from passed source_info.
  3. (Question above) call extract_upsert_avro_table_schema.

The call was made by upsert avro with user defined primary key in sql.

  • extract_upsert_avro_table_schema is essentially extract_avro_table_schema followed by key validation/mutation
    • When key schema is defined, it ensures key columns are subset of value columns.
    • When key schema is not defined, it adds an _rw_key column in addition to value columns.
    • This extra behavior is not intended to be followed by the upsert-avro-with-user-defined-parimay-key-in-sql case. So the 3rd option is not what we want.
  • SpecificParserConfig::new does handle upsert avro and plain avro differently.
    • On upsert, it requires use of schema registry, and always fetches both key and value schema[1].
    • On plain, it allows both schema registry and file/https/s3 URLs[2]. It only fetches value schema.
    • Note that when using schema registry with key schema in schema registry, there is essentially no difference.
  • SpecificParserConfig::new is called at least twice, one in the frontend and one in the compute source executor.
    • The frontend call is our focus here, which either (1) pretends to be (plain, avro) or (2) uses (upsert, avro) from passed source_info.
    • The compute source executor call has always been using (upsert, avro) from passed source_info.

As a result, in cases where SpecificParserConfig::new does behave differently, for example not using schema registry[2], or key schema is not in schema registry[1], the frontend call would succeed but the compute source executor call would fail. No data would be ingested and it will retry with warnings forever[3]. By changing from (1) to (2) in PR, we are able to report such error in frontend.

[1] Schema registry does allow key schema to be absent. In most parts of doc it uses StringSerializer (rather than avro/proto/json) for key. Requiring key schema, especially when user defines primary key in sql, might be a bug.
[2] Current implementation of avro without schema registry is not practical due to #12871
[3] We tolerate errors in source (src). However, the actual error was not included in warning message, but used as a metrics label. This makes the warning less useful, and metrics cardinality potentially huge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants